package com.dec.kafka.offset.reset;

import com.google.gson.Gson;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * kafka offset reset
 * 重置kafka指定消费组的偏移量
 */
public class KafkaOffsetReset {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaOffsetReset.class);

    static KafkaConsumer<String, String> consumer;
    static Properties props = new Properties();

    static String group_id = "test-dec-rt-dealed-t00101";
    static String topic = "test-dec-rt-dealed-t00101";
    static String bootstrap_servers = "bigdata-slave1.phmcluster.calabar:6667,bigdata-slave2.phmcluster.calabar:6667,bigdata-slave3.phmcluster.calabar:6667";
    static String offset_reset_config = "earliest";

    static int partition = 0;
    static int offset = 10;
    static int is_reset = 1;
    static int fetch_request_offset = 1;
    static long start_date_time = 1l;
    static String start_date = "2018-09-12 22:30:10";

    public static void main(String[] args) throws Exception {
        loadConfig("/home/hdfs/soft/IdeaProjects/dec-kks-etl/dec-kafka-offset-reset/src/main/resources");

        initConsumer();

        reset();

//        if (fetch_request_offset == 1) {
//            consume();
//        }

    }

    public static void consume() throws InterruptedException {
        consumer.subscribe(Arrays.asList(topic));
        ConsumerRecords<String, String> records;
        Gson gson = new Gson();
        while (true) {
            records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                String value = record.value();
                KKSEntity kks = gson.fromJson(value, KKSEntity.class);
                long data_time = kks.getCollect_time();
                if (Long.valueOf(data_time * 1000).compareTo(start_date_time) == 1) {
                    LOGGER.error("找到起始偏移量：" + record.offset() + "，当前数据时间：" + kks.getCollect_time() + "==>" + DateFormatUtils.format(data_time * 1000, "yyyy-MM-dd HH:mm:ss"));
                    offset = (int) record.offset();
                    reset();
                    System.exit(1);
                } else {
                    LOGGER.warn("当前偏移量：" + record.offset() + "，寻找偏移量->" + DateFormatUtils.format(data_time * 1000, "yyyy-MM-dd HH:mm:ss"));
                }
            }
//            Thread.sleep(2000);
        }

    }

    public static void reset(KafkaConsumer<String, String> consumer,
                             String topic,
                             String group_id,
                             int partition,
                             int offset) throws InterruptedException {
        Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap<TopicPartition, OffsetAndMetadata>();
        TopicPartition tp = new TopicPartition(topic, partition);
        OffsetAndMetadata om = new OffsetAndMetadata(offset, group_id);
        offsetAndMetadataMap.put(tp, om);
        consumer.commitSync(offsetAndMetadataMap);
    }

    public static void initConsumer(String offset_reset_config,
                                    String group_id,
                                    String bootstrap_servers) {
        Properties config = new Properties();
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset_reset_config);
        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, group_id);
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
    }


    public static void reset() throws InterruptedException {
        Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap<TopicPartition, OffsetAndMetadata>();
        TopicPartition tp = new TopicPartition(topic, partition);
        OffsetAndMetadata om = new OffsetAndMetadata(offset, group_id);
        offsetAndMetadataMap.put(tp, om);
        consumer.commitSync(offsetAndMetadataMap);
    }

    public static void initConsumer() {
        Properties config = new Properties();
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset_reset_config);
        config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, group_id);
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap_servers);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        consumer = new KafkaConsumer<String, String>(config);
    }

    public static void loadConfig(String confPath) throws Exception {
        try {
            if (!confPath.endsWith(File.separator)) {
                confPath = confPath + File.separator;
            }
            confPath = confPath + "kafka.properties";
            props.load(new FileInputStream(confPath));

            group_id = props.getProperty("group_id");
            topic = props.getProperty("topic");
            bootstrap_servers = props.getProperty("bootstrap_servers");
            offset_reset_config = props.getProperty("offset_reset_config");

            start_date = props.getProperty("start_date");
            start_date_time = DateUtils.parseDate(start_date, "yyyy-MM-dd HH:mm:ss").getTime();

            partition = Integer.valueOf(props.getProperty("partition"));
            offset = Integer.valueOf(props.getProperty("offset"));

            is_reset = Integer.valueOf(props.getProperty("is_reset"));
            fetch_request_offset = Integer.valueOf(props.getProperty("fetch_request_offset"));

        } catch (Exception e) {
            LOGGER.error("配置文件加载路径不正确!" + confPath, e);
            throw new Exception("配置文件加载路径不正确!" + confPath, e);
        }
    }
}
